#include "lsv_rcache.h"
#include "lsv_log.h"
#include "lsv_volume.h"
#include "volume_proto.h"
#include "list.h"

int lsv_rcache_pu_init(lsv_rcache_page_unit *pu){

	INIT_LIST_HEAD(&pu->page_lru_hook);
        INIT_LIST_HEAD(&pu->chunk_list);
        INIT_LIST_HEAD(&pu->index_list);
        lsv_rwlock_init(&pu->rwlock);

        return 0;
}

int lsv_rcache_pu_malloc(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit **pu){
	lsv_s32_t ret = 0;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
	ret = ymalloc((void **)pu, sizeof(lsv_rcache_page_unit));
        if(unlikely(ret)){
        	ret = -EINVAL;
                DERROR("malloc for lsv_rcache_page_unit failed\n");
               	return ret;
        }
        rcache->page_num ++;
	lsv_rcache_pu_init(*pu);
	return ret;
}

int lsv_rcache_pu_set(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit *pu, lsv_u32_t chunk_id, lsv_s32_t chunk_off, lsv_u64_t lba, lsv_s32_t size){

        (void) volume_proto;
        (void) size;
        YASSERT(lba % LSV_PAGE_SIZE == 0);
        YASSERT(chunk_off % LSV_PAGE_SIZE == 0);

        pu->lba = lba;
        pu->chunk_id = chunk_id;
        pu->chunk_off = chunk_off;// - page_offset;
        pu->size = LSV_PAGE_SIZE;

        return 0;
}

int lsv_rcache_pu_get(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit **pu){
        int ret = 0;
	lsv_rcache_page_unit *tpu = NULL;
        lsv_rcache_t *rcache = (lsv_rcache_t *)volume_proto->rcache;

	if(rcache->page_num < LSV_RCACHE_PAGE_NUM_MAX) {
		ret = lsv_rcache_pu_malloc(volume_proto, &tpu);
                if(unlikely(ret)){
                        DERROR("malloc for lsv_rcache_page_unit failed\n");
                        return ret;
                }
		*pu = tpu;
	} else {
		struct list_head *pos, *n;
		list_for_each_prev_safe(pos, n, &rcache->page_lru){
			tpu = list_entry(pos, lsv_rcache_page_unit, page_lru_hook);

			if(tpu->status != LSV_RCACHE_LOADING){
				*pu = tpu;
                		lsv_rcache_page_list_clear(volume_proto, tpu);
				break;
			}
		}
        }
	return ret;
}

int lsv_rcache_pu_memcpy(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit *pu, lsv_u32_t chunk_id,
                         lsv_s32_t chunk_offset, lsv_u64_t lba, lsv_s32_t size, lsv_s8_t * buf){

        YASSERT(lba % LSV_PAGE_SIZE == 0);
        YASSERT(chunk_offset % LSV_PAGE_SIZE == 0);

        (void) size;
        memcpy(pu->buf, (lsv_s8_t *)buf + chunk_offset, LSV_PAGE_SIZE);

#ifdef __PAGE_CRC32__
        lsv_u32_t crc1, crc2;
        crc1 = page_crc32((lsv_s8_t *)buf+chunk_offset);
        crc2 = page_crc32(pu->buf);

        PRINT_PAGE_CRC(lba, chunk_id, CHUNKOFF2PAGEIDX(chunk_offset), crc1, crc2);
#endif
	lsv_rcache_pu_set(volume_proto, pu, chunk_id, chunk_offset, lba, LSV_PAGE_SIZE);
	lsv_rcache_page_hash_index_insert(volume_proto, pu);
	return 0;
}

int lsv_rcache_pu_insert(lsv_volume_proto_t *volume_proto, lsv_u32_t chunk_id,
                lsv_s32_t chunk_offset, lsv_u64_t off, lsv_s32_t size, void * buf)
{
        lsv_rcache_t *rcache = (lsv_rcache_t *)volume_proto->rcache;
        lsv_rcache_page_unit * pu = NULL;
        struct list_head *pos, *n;
        hash_index_head * hash_index = rcache->page_hash_index;

        YASSERT(chunk_offset % LSV_PAGE_SIZE == 0);

        lsv_u64_t lba = OFF2LBA(off);
        lsv_s32_t hash_key = HASH_KEY(lba);

        //查找当前page_list中是否有相同lba索引的pu，若有，则替换，否则，插入
        //查找，并替换
        if (LSV_RCACHE_PAGE_NUM_MAX <= 0 || NULL == hash_index){
                return 0;
        }

        list_for_each_safe(pos, n, &hash_index[hash_key].list) {
                pu = list_entry(pos, lsv_rcache_page_unit, index_list);
                if(pu->lba == lba){
                		lsv_rcache_page_list_clear(volume_proto, pu);
                        //将cu->buf对应的page拷贝到pu->buf中，设置pu，将pu插入到hash_index中
                        lsv_rcache_pu_memcpy(volume_proto, pu, chunk_id, chunk_offset, lba, LSV_PAGE_SIZE, buf);
                        return size;
                }
        }

        //查不到，则插入
        lsv_rcache_pu_get(volume_proto, &pu);
        if(NULL == pu){
        		DINFO("canot get invalid pu to copy pu into page_list!\n");
        		return 0;
        }
        //将cu->buf对应的page拷贝到pu->buf中，设置pu，将pu插入到hash_index中
        lsv_rcache_pu_memcpy(volume_proto, pu, chunk_id, chunk_offset, lba, LSV_PAGE_SIZE, buf);
        co_cond_broadcast(&rcache->lba_cond, 0);
        return size;
}

int lsv_rcache_load_page(lsv_volume_proto_t *volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_offset,
                       lsv_u64_t off, lsv_s32_t size, buffer_t *append_buf){
        int ret = 0;
        lsv_rcache_t *rcache = NULL;
        lsv_rcache_page_unit * pu = NULL;

        uint64_t lba = OFF2LBA(off);
        lsv_s32_t page_offset = off % LSV_PAGE_SIZE;

        rcache = (lsv_rcache_t *)volume_proto->rcache;

        if(LSV_CHUNK_NULL == chunk_id){
                DERROR("lookup rcache , chunk_id is 0, the parameter is wrong!\n");
                ret = -EINVAL;
                return ret;
        }
#ifdef __NO_CACHE__
	lsv_s8_t *buf = malloc(4096);
	ret = lsv_rcache_read_data(volume_proto, buf, chunk_id, chunk_offset, LSV_PAGE_SIZE);
        if(ret){
                DERROR("call lsv_log_slog_read read chunk_id %d error:%d\n", chunk_id, ret);
		free(buf);
                return ret;
        }
        mbuffer_appendmem(append_buf, buf + page_offset, size);
	free(buf);
	return size;
#endif
        //ret = lsv_rcache_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
        //仅查看page的索引项，防止load多个page，不在检查chunk和l2_cache的索引项
        ret = lsv_rcache_page_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
        if(ret == size){
                volume_proto->read_rc_hit++;
		rcache->page_hit_cnt ++;
                return size;
        }

        // TODO 是否需要在插入索引前，清楚page对应的索引信息，
        //确保trylock不会yield

        volume_proto->read_rc_miss++;
        DINFO("request page not in readcache, call lsv_log_slog_read\n");

        rcache->page_load++;

        // TODO 获取锁，进入临界区
        // 如果第一task堵塞在lock上，此时尚没有更新index
        // 第二任务此时进入，会得到第二个pu对象？
        // 现在锁定的是内存对象
        // 如果锁定逻辑对象，如LBA，则无此问题，且可以后更新索引
        while(1){
                lsv_rcache_pu_get(volume_proto, &pu);
                if(NULL != pu){
                        break;
                }
		co_cond_wait(&rcache->lba_cond);
                //TODO 当无法获取pu时，需要yield退出后，等待其他pu释放锁，然后进入重新获取锁
        }
        //memset(pu, 0, sizeof(lsv_rcache_page_unit));
	
	pu->status = LSV_RCACHE_LOADING;
	
        ret = lsv_rcache_read_data(volume_proto, pu->buf, chunk_id, chunk_offset, LSV_PAGE_SIZE);
        if(ret){
                DERROR("call lsv_log_slog_read read chunk_id %d error:%d\n", chunk_id, ret);
                list_add_tail(&pu->page_lru_hook, &rcache->page_lru);
                return ret;
        }

        lsv_rcache_pu_set(volume_proto, pu, chunk_id, chunk_offset, lba, LSV_PAGE_SIZE);
        lsv_rcache_page_hash_index_insert(volume_proto, pu);

	pu->status = LSV_RCACHE_IN_USE;

        mbuffer_appendmem(append_buf, pu->buf + page_offset, size);
#ifdef __PAGE_CRC32__
        PRINT_PAGE_CRC(lba, chunk_id, CHUNKOFF2PAGEIDX(chunk_offset), page_crc32(pu->buf), 0);
#endif

	co_cond_broadcast(&rcache->lba_cond, ret);
        return size;
}
